package com.cantor.consumer.handler;

import com.cantor.core.message.PingMessage;
import com.cantor.core.message.PongMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.ScheduledFuture;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * Pong心跳包处理
 * 负责接收Pong心跳包和处理读空闲事件
 */
@Slf4j
@ChannelHandler.Sharable
public class PongHandler extends SimpleChannelInboundHandler<PongMessage> {

    // 除此发送心跳包延迟时间
    private final long firstHeartDelay = 1;

    // 心跳包发送间隔时间
    private final long heartInterval = 5L;

    // 心跳包发送间隔时间单位
    private final TimeUnit heartIntervalUnit = TimeUnit.SECONDS;

    // 定时任务对象
    private ScheduledFuture scheduledFuture;

    // Channel启动后,开始定时间隔发送心跳包
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 每隔5秒发送一个ping(第一次1秒后就发)
        scheduledFuture = ctx.executor().scheduleAtFixedRate(() -> {
            ctx.writeAndFlush(new PingMessage()).addListener(f -> {
                if (f.isSuccess()) {
                    log.debug("{} >>> PING >>> {}", ctx.channel().localAddress(), ctx.channel().remoteAddress());
                }
            });
        }, firstHeartDelay, heartInterval, heartIntervalUnit);
        // fire
        super.channelActive(ctx);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, PongMessage pong) throws Exception {
        log.debug("{} <<< PONG <<< {}", ctx.channel().localAddress(), ctx.channel().remoteAddress());
    }

    // 当channel关闭, schedule事件也要关闭
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true); // 取消定时任务;
        }
        log.debug("Channel {] ------ {} 的定时心跳任务已被取消", ctx.channel().localAddress(), ctx.channel().remoteAddress());
        super.channelInactive(ctx);
    }

    // 写空闲处理(如果长时间没有写事件, 就发送一个Ping
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            switch (((IdleStateEvent) evt).state()) {
                case READER_IDLE:
                    log.error("客户端{}触发对{}的读空闲,已断开连接", ctx.channel().localAddress(), ctx.channel().remoteAddress());
                    ctx.close();
                    break;
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
